Pythonセミナー メッセージブローカー編
Redis を使ってみよう
threadingやmultiprocessingモジュールは、シングルホストで実行されているプログラムに最適ですが、別のマシンで作業を実行する場合、またはローカルマシンのCPUを超えるスケールアップが必要な場合では、Pythonの拡張モジュールRQが便利です。
RQは、非常にシンプルでありながら強力なライブラリです。最初に、ライブラリを使用して関数とその引数をキューに入れます。 これは関数呼び出し表現を「ピクルス」し、Redisリストに追加されます。 ジョブのエンキューは最初のステップですが、まだ何も動作しません。 また、そのジョブキューをリッスンするためには、少なくともひとつのワーカーが必要です。
最初のステップは、コンピューターにRedisサーバーをインストールして実行するか、実行中のRedisサーバーにアクセスできるようにすることです。 その後、既存のコードに加えられた小さな変更はわずかです。 まず、RQキューのインスタンスを作成し、redis-pyライブラリからRedisサーバーのインスタンスを渡します。 次に、
q.enqueue(関数、関数の引数...)
を呼び出します。 enqueue()
メソッドは、最初の引数として関数を受け取り、ジョブが実際に実行されるときに、他の引数またはキーワード引数がその関数に渡されます。最後のステップは、いくつかのワーカーを起動することです。 RQは、デフォルトのキューでワーカーを実行するための便利なスクリプトを提供します。 ターミナルウィンドウで
rqworker
を実行するだけで、デフォルトのキューでリッスンしているワーカーが起動します。 現在の作業ディレクトリがスクリプトが存在する場所と同じであることを確認してください。別のキューをリッスンする場合は、rqworker queue_nameを実行すると、その名前のキューがリッスンされます。 RQの優れている点は、Redisに接続できる限り、必要な数の異なるマシンで必要な数のワーカーを実行できることです。 したがって、アプリケーションの成長に合わせてスケールアップするのは非常に簡単です。 RQバージョンのソースは次のとおりです。from loguru import logger
from stock_downloader import stock_download
from multiprocessing.pool import Pool
from redis import Redis
from rq import Queue
stock_symbols = ['AAPL', 'NVDA', 'MSFT', 'JNJ']
logger.remove()
logger.add('/tmp/stock_downloader.log')
def main():
q = Queue(connection=Redis(host='localhost', port=6379))
for symbol in stock_symbols:
q.enqueue(stock_download, symbol)
if __name__ == '__main__':
main()
ただし、PythonジョブキューソリューションはRQだけではありません。 RQは使いやすく、単純なユースケースを非常にうまく処理してくれますが、より高度なオプションが必要な場合は、Celeryなどの他のものを使用することもできます。